1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.async.socket;
20 import core.stdc.stdint;
21 import core.thread : Fiber;
22 import core.time : dur, Duration;
23 import std.array : empty;
24 import std.conv : to;
25 import std.exception : enforce;
26 import std.socket;
27 import thrift.base;
28 import thrift.async.base;
29 import thrift.transport.base;
30 import thrift.transport.socket : TSocketBase;
31 import thrift.internal.endian;
32 import thrift.internal.socket;
33 
34 version (Windows) {
35   import core.sys.windows.winsock2 : connect;
36 } else version (Posix) {
37   import core.sys.posix.sys.socket : connect;
38 } else static assert(0, "Don't know connect on this platform.");
39 
40 /**
41  * Non-blocking socket implementation of the TTransport interface.
42  *
43  * Whenever a socket operation would block, TAsyncSocket registers a callback
44  * with the specified TAsyncSocketManager and yields.
45  *
46  * As for thrift.transport.socket, due to the limitations of std.socket,
47  * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
48  * not).
49  */
50 class TAsyncSocket : TSocketBase, TAsyncTransport {
51   /**
52    * Constructor that takes an already created, connected (!) socket.
53    *
54    * Params:
55    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
56    *   socket = Already created, connected socket object. Will be switched to
57    *     non-blocking mode if it isn't already.
58    */
59   this(TAsyncSocketManager asyncManager, Socket socket) {
60     asyncManager_ = asyncManager;
61     socket.blocking = false;
62     super(socket);
63   }
64 
65   /**
66    * Creates a new unconnected socket that will connect to the given host
67    * on the given port.
68    *
69    * Params:
70    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
71    *   host = Remote host.
72    *   port = Remote port.
73    */
74   this(TAsyncSocketManager asyncManager, string host, ushort port) {
75     asyncManager_ = asyncManager;
76     super(host, port);
77   }
78 
79   override TAsyncManager asyncManager() @property {
80     return asyncManager_;
81   }
82 
83   /**
84    * Asynchronously connects the socket.
85    *
86    * Completes without blocking and defers further operations on the socket
87    * until the connection is established. If connecting fails, this is
88    * currently not indicated in any way other than every call to read/write
89    * failing.
90    */
91   override void open() {
92     if (isOpen) return;
93 
94     enforce(!host_.empty, new TTransportException(
95       "Cannot open null host.", TTransportException.Type.NOT_OPEN));
96     enforce(port_ != 0, new TTransportException(
97       "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
98 
99 
100     // Cannot use std.socket.Socket.connect here because it hides away
101     // EINPROGRESS/WSAWOULDBLOCK.
102     Address addr;
103     try {
104       // Currently, we just go with the first address returned, could be made
105       // more intelligent though – IPv6?
106       addr = getAddress(host_, port_)[0];
107     } catch (Exception e) {
108       throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
109         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
110     }
111 
112     socket_ = new TcpSocket(addr.addressFamily);
113     socket_.blocking = false;
114     setSocketOpts();
115 
116     auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
117     if (errorCode == 0) {
118       // If the connection could be established immediately, just return. I
119       // don't know if this ever happens.
120       return;
121     }
122 
123     auto errno = getSocketErrno();
124     if (errno != CONNECT_INPROGRESS_ERRNO) {
125       throw new TTransportException(`Could not establish connection to "` ~
126         host_ ~ `": ` ~ socketErrnoString(errno),
127         TTransportException.Type.NOT_OPEN);
128     }
129 
130     // This is the expected case: connect() signalled that the connection
131     // is being established in the background. Queue up a work item with the
132     // async manager which just defers any other operations on this
133     // TAsyncSocket instance until the socket is ready.
134     asyncManager_.execute(this,
135       {
136         auto fiber = Fiber.getThis();
137         TAsyncEventReason reason = void;
138         asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
139           connectTimeout,
140           scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
141         );
142         Fiber.yield();
143 
144         if (reason == TAsyncEventReason.TIMED_OUT) {
145           // Close the connection, so that subsequent work items fail immediately.
146           closeImmediately();
147           return;
148         }
149 
150         int32_t errorCode = void;
151         socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, errorCode);
152 
153         if (errorCode) {
154           logInfo("Could not connect TAsyncSocket: %s",
155             socketErrnoString(errorCode));
156 
157           // Close the connection, so that subsequent work items fail immediately.
158           closeImmediately();
159           return;
160         }
161 
162       }
163     );
164   }
165 
166   /**
167    * Closes the socket.
168    *
169    * Will block until all currently active operations are finished before the
170    * socket is closed.
171    */
172   override void close() {
173     if (!isOpen) return;
174 
175     import core.sync.condition;
176     import core.sync.mutex;
177 
178     auto doneMutex = new Mutex;
179     auto doneCond = new Condition(doneMutex);
180     synchronized (doneMutex) {
181       asyncManager_.execute(this,
182         scopedDelegate(
183           {
184             closeImmediately();
185             synchronized (doneMutex) doneCond.notifyAll();
186           }
187         )
188       );
189       doneCond.wait();
190     }
191   }
192 
193   override bool peek() {
194     if (!isOpen) return false;
195 
196     ubyte buf;
197     auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
198     if (r == Socket.ERROR) {
199       auto lastErrno = getSocketErrno();
200       static if (connresetOnPeerShutdown) {
201         if (lastErrno == ECONNRESET) {
202           closeImmediately();
203           return false;
204         }
205       }
206       throw new TTransportException("Peeking into socket failed: " ~
207         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
208     }
209     return (r > 0);
210   }
211 
212   override size_t read(ubyte[] buf) {
213     enforce(isOpen, new TTransportException(
214       "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
215 
216     typeof(getSocketErrno()) lastErrno;
217 
218     auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
219       TAsyncEventType.READ);
220 
221     // If recv went fine, immediately return.
222     if (r >= 0) return r;
223 
224     // Something went wrong, find out how to handle it.
225     lastErrno = getSocketErrno();
226 
227     static if (connresetOnPeerShutdown) {
228       // See top comment.
229       if (lastErrno == ECONNRESET) {
230         return 0;
231       }
232     }
233 
234     throw new TTransportException("Receiving from socket failed: " ~
235       socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
236   }
237 
238   override void write(in ubyte[] buf) {
239     size_t sent;
240     while (sent < buf.length) {
241       sent += writeSome(buf[sent .. $]);
242     }
243     assert(sent == buf.length);
244   }
245 
246   override size_t writeSome(in ubyte[] buf) {
247     enforce(isOpen, new TTransportException(
248       "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
249 
250     auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
251 
252     // Everything went well, just return the number of bytes written.
253     if (r > 0) return r;
254 
255     // Handle error conditions.
256     if (r < 0) {
257       auto lastErrno = getSocketErrno();
258 
259       auto type = TTransportException.Type.UNKNOWN;
260       if (isSocketCloseErrno(lastErrno)) {
261         type = TTransportException.Type.NOT_OPEN;
262         closeImmediately();
263       }
264 
265       throw new TTransportException("Sending to socket failed: " ~
266         socketErrnoString(lastErrno), type);
267     }
268 
269     // send() should never return 0.
270     throw new TTransportException("Sending to socket failed (0 bytes written).",
271       TTransportException.Type.UNKNOWN);
272   }
273 
274   /// The amount of time in which a conncetion must be established before the
275   /// open() call times out.
276   Duration connectTimeout = dur!"seconds"(5);
277 
278 private:
279   void closeImmediately() {
280     socket_.close();
281     socket_ = null;
282   }
283 
284   T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
285     while (true) {
286       auto result = call();
287       if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
288 
289       // We got an EAGAIN result, register a callback to return here once some
290       // event happens and yield.
291 
292       Duration timeout = void;
293       final switch (eventType) {
294         case TAsyncEventType.READ:
295           timeout = recvTimeout_;
296           break;
297         case TAsyncEventType.WRITE:
298           timeout = sendTimeout_;
299           break;
300       }
301 
302       auto fiber = Fiber.getThis();
303       assert(fiber, "Current fiber null – not running in TAsyncManager?");
304       TAsyncEventReason eventReason = void;
305       asyncManager_.addOneshotListener(socket_, eventType, timeout,
306         scopedDelegate((TAsyncEventReason reason) {
307           eventReason = reason;
308           fiber.call();
309         })
310       );
311 
312       // Yields execution back to the async manager, will return back here once
313       // the above listener is called.
314       Fiber.yield();
315 
316       if (eventReason == TAsyncEventReason.TIMED_OUT) {
317         // If we are cancelling the request due to a timed out operation, the
318         // connection is in an undefined state, because the server could decide
319         // to send the requested data later, or we could have already been half-
320         // way into writing a request. Thus, we close the connection to make any
321         // possibly queued up work items fail immediately. Besides, the server
322         // is not very likely to immediately recover after a socket-level
323         // timeout has expired anyway.
324         closeImmediately();
325 
326         throw new TTransportException("Timed out while waiting for socket " ~
327           "to get ready to " ~ to!string(eventType) ~ ".",
328           TTransportException.Type.TIMED_OUT);
329       }
330     }
331   }
332 
333   /// The TAsyncSocketManager to use for non-blocking I/O.
334   TAsyncSocketManager asyncManager_;
335 }
336 
337 private {
338   // std.socket doesn't include SO_ERROR for reasons unknown.
339   version (linux) {
340     enum SO_ERROR = 4;
341   } else version (OSX) {
342     enum SO_ERROR = 0x1007;
343   } else version (FreeBSD) {
344     enum SO_ERROR = 0x1007;
345   } else version (Windows) {
346     import core.sys.windows.winsock2 : SO_ERROR;
347   } else static assert(false, "Don't know SO_ERROR on this platform.");
348 
349   // This hack forces a delegate literal to be scoped, even if it is passed to
350   // a function accepting normal delegates as well. DMD likes to allocate the
351   // context on the heap anyway, but it seems to work for LDC.
352   import std.traits : isDelegate;
353   auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
354     return d;
355   }
356 }